Skip to content

INT-4162: TCP/UDP DSL #1966

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed

Conversation

garyrussell
Copy link
Contributor

JIRA: https://jira.spring.io/browse/INT-4162

INT-4162: IP DSL - Phase I Connection Factories

Phase 2 - Adapters/Gateways

Phase 3 - UDP

<S extends AbstractConnectionFactorySpec<S, C>, C extends AbstractConnectionFactory>
extends IntegrationComponentSpec<S, C> {

protected C target;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already target for the same purpose in the super IntegrationComponentSpec


protected C target;

public AbstractConnectionFactorySpec(C connectionFactory) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically I do all the Spec only visible for the factory methods to instantiate.
Is there any purpose to let this be created directly from end-user level?

@Override
protected C doGet() {
if (getId() != null) {
this.target.setBeanName(getId());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getId() is fully orthogonal to the setId() . Since our target is populated from the ctor, it looks like we can directly call this setBeanName() from the id() and get rid of this doGet() altogether.
WDYT?

this(host, port, false);
}

public TcpClientConnectionFactorySpec(String host, int port, boolean nio) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be public?
Then why the previous one isn't?

* Create a server spec that uses NIO.
* @param port the port to listen on.
* @param <S> the spec type.
* @param <C> the connection factrory type.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: factrory

/**
* Provides TCP/UDP Component support for the Java DSL.
*/
package org.springframework.integration.ip.dsl;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New line in the end of each class.

}
if (this.serverConnectionFactory != null) {
this.serverConnectionFactory.stop(callback);
this.serverConnectionFactory.stop();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After changes to this method, the callback is out of use. I think we should call it in the end of the method anyway.
From it even possible to avoid code duplication since stop() without callback does the same now.

@Bean
public IntegrationFlow inTcpGateway() {
return IntegrationFlows.from(Tcp.inboundGateway(server1()))
.transform(new ObjectToStringTransformer())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit pick. We have Transformers factory.


@Bean
public IntegrationFlow outUdpAdapter() {
return f -> f.handle(Udp.outboundAdapter("headers['udp_dest']"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we consider to have Expression and Function factory method variants ?

* @since 5.0
*
*/
public class TcpInboundChannelAdapterSpec<S extends TcpInboundChannelAdapterSpec<S>>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I see that this generic variant works well in the target code.
So, now I'm not sure if we should restrict with target type without generic modification or leave it with that and let to extend in the future easy...
But, eh., typically we don't do that.
And I'm not sure that we are going to extend TcpReceivingChannelAdapter class to have some TcpInboundChannelAdapterSpec extension as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - I based these on the JMS ones - I didn't notice the subclasses there; fixing...

JIRA: https://jira.spring.io/browse/INT-4162

INT-4162: IP DSL - Phase I Connection Factories

Phase 2 - Adapters/Gateways

Phase 3 - UDP
Remove unnecessary generics.
@garyrussell
Copy link
Contributor Author

@artembilan One issue with this implementation is...

...from(Tcp.inbouncChannelAdapter(Tcp.server(1234).get())...

The connection factory is not a bean, so in the tests I used

...from(Tcp.inboundGateway(server1()))...

so the connection factory is initialized.

Any suggestions for making the first one work - if we inject a Spec instead of the result of the get() perhaps ??

@artembilan
Copy link
Member

The TcpInboundChannelAdapterSpec can populate provided ConnectionFactory to the ComponentsRegistration and DSL BPP will take care if it need to register a bean or such one is already there.

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see ComponentsRegistration implementation.
Or you decided to stay away from that (at least for now) or have just missed my comment as I don't see email notification for your latest commit...

}

/**
* @param soKeepAlive the socket keepalive option.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a typo 😄 : keep alive ?

}

/**
* @param taskExecutor the task excecutor.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: excecutor

import org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory;

/**
* An {@link IntegrationComponentSpec} for {@link AbstractClientConnectionFactory}s.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JavaDocs doesn't reflect reality of this class.

import org.springframework.integration.ip.tcp.connection.TcpNioServerConnectionFactory;

/**
* An {@link IntegrationComponentSpec} for {@link AbstractServerConnectionFactory}s.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same JavaDocs problem here.

this.target.setConnectionFactory(connectionFactory);
}

public TcpOutboundGatewaySpec remoteTimeout(long remoteTimeout) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind to elaborate why this hasn't been addressed?
Thanks

}

/**
* @param soSendBufferSize set the send bufffer size socket option.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: triple buffer for f 😄

* @return the spec.
* @see UnicastSendingMessageHandler#setSocketExpression(Expression)
*/
public UdpOutboundChannelAdapterSpec socketExpression(Expression socketExpression) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

socketFunction() with wrapping to the FunctionExpression would be good, too.

@@ -99,6 +102,28 @@ public MulticastSendingMessageHandler(String address, int port,
super(address, port, lengthCheck, acknowledge, ackHost, ackPort, ackTimeout);
}

/**
* Construct UnicastSendingMessageHandler based on the destination SpEL expression to
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/UnicastSendingMessageHandler/MulticastSendingMessageHandler

}

/**
* Construct UnicastSendingMessageHandler based on the destination SpEL expression to
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DITTO

.handle(Tcp.outboundGateway(this.client1))
.transform(Transformers.objectToString());
IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool observation for unit testing 😄

@artembilan
Copy link
Member

:spring-integration-ip:checkstyleMain[ant:checkstyle] [ERROR] /home/travis/build/spring-projects/spring-integration/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpOutboundChannelAdapterSpec.java:21:8: Unused import - org.springframework.expression.Expression. [UnusedImports]

😄

* @return the spec.
* @see UnicastSendingMessageHandler#setSocketExpression(org.springframework.expression.Expression)
*/
public UdpOutboundChannelAdapterSpec socketExpression(Function<Message<?>, ?> socketExpressionFunction) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's definitely socketFunction().
The fact transformation to a FunctionExpression is out of end-user API.

Plus this must be like Function<Message<?>, DatagramSocket>, just because:

if (this.socketExpression != null) {
    socket = this.socketExpression.getValue(this.evaluationContext, message, DatagramSocket.class);
}

therefore we restrict the target code with particular type and that is much easier to write the final lambda.

super();
}

UdpOutboundChannelAdapterSpec(String destinationExpression) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there is UnicastSendingMessageHandler(Expression destinationExpression), I would like to see UdpOutboundChannelAdapterSpec(Function<Message<?>, Object> destinationFunction)


TcpInboundChannelAdapterSpec(AbstractConnectionFactory connectionFactory) {
super(new TcpReceivingChannelAdapter());
this.connectionFactory = connectionFactory;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I see your point about ComponentsRegistration.
How about to have alternative ctor for the AbstractConnectionFactorySpec and populate for registration just only result of that?
Any AbstractConnectionFactory variant should be already supplied as a @Bean
WDYT?

I suggests this because I don't know how to answer to your concern in the https://github.com/spring-projects/spring-integration-java-dsl/issues/137 😄

Add ...Spec CTORs to avoid the issue described here:
https://github.com/spring-projects/spring-integration-java-dsl/issues/137

Also other PR comments.
* @return the spec.
* @see UnicastSendingMessageHandler#setSocketExpression(org.springframework.expression.Expression)
*/
public UdpOutboundChannelAdapterSpec socketExpression(Function<Message<?>, ?> socketExpressionFunction) {
this.target.setSocketExpression(new FunctionExpression<>(socketExpressionFunction));
public UdpOutboundChannelAdapterSpec socketExpression(Function<Message<?>, DatagramSocket> socketFunction) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you agree with me that socketFunction() sounds better as a name for this kind of method ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry - I changed the argument, not the name.

See my comment on https://github.com/spring-projects/spring-integration-java-dsl/issues/137

Let me know if you think I should revert that part of the last commit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can live with your current Spec-aware ctors. They feel much organic if we would like to inline ConnectionFactory instance.
That your comment isn't clear to me yet, but I'm fine to leave it as is.
We spent already enough time on this PR and it is very close for merge. No reason to revert if it doesn't hurt 😄 .
Let's see what the community says!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We spent already enough time on this PR

Thanks for your patience - this was my first complete module; hopefully my next commit will be the last.

this.target = new UnicastSendingMessageHandler(destinationExpression);
}

UdpOutboundChannelAdapterSpec(Function<Message<?>, ?> destinationFunction) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ctor isn't covered in the Udp.
The same concern for the UdpMulticastOutboundChannelAdapterSpec.
Also I see a generic problem:

Udp.outboundMulticastAdapter("headers['udp_dest']")
                    .lengthCheck(true)
                    .timeToLive(10)

We can't reach an inheritors methods if we call the super.
So, this UdpOutboundChannelAdapterSpec must be with generic specification for itself. Exactly what you had before on the TcpInboundChannelAdapterSpec for example.
Or what we have on the JmsOutboundChannelAdapterSpec or AbstractConnectionFactorySpec.

@artembilan
Copy link
Member

The Travis failure isn't related to your work, but I wonder if you wouldn't mind taking a fresh look to that JdbcLockRegistryLeaderInitiatorTests.
Thanks

@garyrussell
Copy link
Contributor Author

It's too hard to debug travis problems without test artifacts - added a log adjuster to get info if it occurs on Bamboo.

@@ -25,24 +25,25 @@
import org.springframework.messaging.Message;

/**
* A {@link MessageHandlerSpec} for {@link UnicastSendingMessageHandler}s.
* A {@link MessageHandlerSpec} for UDB {@link org.springframework.messaging.MessageHandler}s.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: UDP ?

*
* @param <S> the target {@link UdpUnicastOutboundChannelAdapterSpec} implementation type.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/UdpUnicastOutboundChannelAdapterSpec/AbstractUdpOutboundChannelAdapterSpec

Udp.outboundMulticastAdapter("headers['udp_dest']")
.lengthCheck(true)
.timeToLive(10);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test doesn't look finished: no one assertion.
I understand the premise of this, but looks like a visual observation and compiler approval make a staff.
OTOH I understand also that there might be a case when we change the API and just don't have the code to prove a support for proper method chain.
Maybe it really would be good to finish the test for the valid MulticastSendingMessageHandler use-case?..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just a compile-time test - before the commit it wouldn't compile. I didn't think it was necessary to run a runtime test; feel free to delete it on merge it you want.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I'll just add in the end an assertion that we have proper instanceOf after Spec evaluation.

@artembilan
Copy link
Member

According the latest Travis report we have some problem in the MongoDbInboundChannelAdapterIntegrationTests.
I think there is a race condition with all those fixed-rate in config.
And our testWithOnSuccessDisposition() just received two message for the same document. Just because the poller is able to start a new polling task because the previous hasn't finished during expected fixed-rate="200".

Right, that isn't task of your PR, but does it matter when and how do we fix it ?

Your thoughts ?
Thanks

@artembilan
Copy link
Member

Merged as 65ec234

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants